package com.iot.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.Logger;
import com.iot.util.PropertiesReader;
import java.util.*;
import org.apache.kafka.clients.producer.*;
import com.iot.vo.IotData;


public class DataProducer {
    private static final Logger logger = Logger.getLogger(DataProducer.class);

    public static void main(String[] args)throws Exception{
        Properties prop = PropertiesReader.readPropertyFile();
        String servers = prop.getProperty("com.iot.bootstrap.servers");
        logger.info("Using servers = " + servers);
        prop.put("bootstrap.servers", servers); //kafka集群地址
        prop.put("acks", "all"); //acks它代表消息确认机制   // 1 0 -1 all
        prop.put("retries", 0); //重试的次数
        prop.put("batch.size", 16384); //批处理数据的大小，每次写入多少数据到topic
        prop.put("linger.ms", 1);
        prop.put("buffer.memory",33554422);
        prop.put("key.serializer", "com.iot.util.IotDataEncoder");
        prop.put("value.serializer", "com.iot.util.IotDataEncoder");
        String topic = prop.getProperty("com.iot.app.kafka.topic");

        Producer<String, IotData>producer = new KafkaProducer<String, IotData>(prop);
        DataProducer iotProducer = new DataProducer();
        iotProducer.generateIotEvent(producer,topic);
        producer.close();
    }

    private void generateIotEvent(Producer<String, IotData>producer,String topic)throws InterruptedException{
        List<String> routeList = Arrays.asList(new String[]{"Route-37", "Route-43", "Route-82"}); //数组转list
        List<String> vehicleTypeList = Arrays.asList(new String[]{"Large Truck", "Small Truck", "Private Car", "Bus", "Taxi"});
        Random rand = new Random();
        logger.info("Sending events");
        while(true){
            List<IotData>eventList = new ArrayList<IotData>();
            for (int i = 0;i < 100;i++){
                String vehicleId = UUID.randomUUID().toString();
                String vehicleType = vehicleTypeList.get(rand.nextInt(5));
                String routeId = routeList.get(rand.nextInt(3));
                Date timestamp = new Date();
                double speed = rand.nextInt(100-20) + 20;
                double fuelLevel = rand.nextInt(40 - 10) + 10;
                for (int j = 0;j < 5;j++){
                    String coords = getCoordinates(routeId);
                    String latitude = coords.substring(0,coords.indexOf(","));
                    String longitude = coords.substring(coords.indexOf(",") + 1, coords.length());
                    IotData event = new IotData(vehicleId, vehicleType, routeId, latitude, longitude, timestamp, speed,fuelLevel);
                    eventList.add(event);
                }
            }
            Collections.shuffle(eventList);
            for (IotData event:eventList){
                producer.send(new ProducerRecord<String, IotData>(topic,event),new Callback(){
                    public void onCompletion(RecordMetadata metadata,Exception exception){
                        if (exception == null){
                            logger.info("消息发送成功");
                        }else{
                            logger.info("消息发送失败");
                        }
                    }
                });
                Thread.sleep(rand.nextInt(30-10) + 30);
            }
        }
    }

    private String getCoordinates(String routeId){
        Random rand = new Random();
        int latPrefix = 0;
        int longPrefix = -0;
        if (routeId.equals("Route-37")) {
            latPrefix = 33;
            longPrefix = -96;
        } else if (routeId.equals("Route-82")) {
            latPrefix = 34;
            longPrefix = -97;
        } else if (routeId.equals("Route-43")) {
            latPrefix = 35;
            longPrefix = -98;
        }
        Float lati = latPrefix + rand.nextFloat();
        Float longi = longPrefix + rand.nextFloat();
        return lati + "," + longi;
    }
}
